import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;

/*
 * A {@link Trigger} that fires once the count of elements in a pane reaches the given count.
 *
 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
 */


//下面的这个可以理解为:
//TimeCountTrigger的泛型是Window的子类。
//泛型类就是把泛型定义在类上，用户使用该类的时候，才把类型明确下来
//Reference:
//https://www.zhihu.com/question/272185241


//代码中自定义Trigger的目的是：
//keyby下过来一条就触发窗口统计，如果没消息过来，按60s触发一次窗口。

@PublicEvolving
public class TimeCountTrigger<W extends Window> extends Trigger<Object, W>
{
    private static final long serialVersionUID = 1L;

    private final long maxCount;
    private final long interval;//这里的interval就是1分钟(60s)，是在顶层调用的入口传入的

    private final ReducingStateDescriptor<Long> stateDesc =
            new ReducingStateDescriptor<>("count",     new Sum(), LongSerializer.INSTANCE);
            // 计数统计

    private final ReducingStateDescriptor<Long> timeStateDesc =
            new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);
            // 上一次的触发时间

    private TimeCountTrigger(long maxCount, long interval) {
        this.maxCount = maxCount;
        this.interval = interval;
    }


//因为是两个触发条件，
//    所以需要两个函数返回TriggerResult.FIRE
//    下面的TriggerResult onElement就是对“每条数据触发一次”这个触发条件的实现
//    onElement是对每个元素的处理
    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception
    {
        System.out.println("onMaxCount ....");
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeStateDesc);//读取文件中的触发时间状态
        timestamp = ctx.getCurrentProcessingTime();


// 下面这个if操作存储的是触发时间
        if (fireTimestamp.get() == null)//如果checkpoint文件中不存在触发时间状态(所以这个地方其实是为了进行初始化)
        {
            long start = timestamp - (timestamp % interval);
            long nextFireTimestamp = start + interval;//窗口触发计算时间戳
/*
对于上面这两行代码的举例:
11:56:37-11:56:37%60s+60s
=11:56:37-37s+60s
=11:57:00
也就是说下次触发时间是11:57:00
*/
            ctx.registerProcessingTimeTimer(nextFireTimestamp);//定时器对下个触发时刻点进行倒计时(所以是倒计时，并非定时)
//        注册定时器的意思是，注册之后开始倒计时，如果倒计时结束，会自动调用onProcessingTime函数


            fireTimestamp.add(nextFireTimestamp);//在checkpoint文件中保存下次"窗口触发计算"的时刻点



        }
        ReducingState<Long> count = ctx.getPartitionedState(stateDesc);//读取文件中的计数状态
        count.add(1L);//对计数状态进行加1



//  下面这个其实是if-else结构，
//   意思是，如果达到最大计数，窗口触发计数
//        如果没有达到，就什么也不干，继续等待。
        if (count.get() >= maxCount) {
            count.clear();
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }



    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx)
    {
        return TriggerResult.CONTINUE;//啥也不干
    }


    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception
    {
        System.out.println("onProcessingTime ....");
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeStateDesc);//获取checkpoint中的时间状态
        

        if (fireTimestamp.get().equals(time))
            //这句话的意思是，如果状态文件中的触发时间戳等于当前时间，则开始准备触发计算
        {
            fireTimestamp.clear();//清除当前将要触发的时间戳。
            fireTimestamp.add(time + interval);//加入下次触发计算的时间戳。
            ctx.registerProcessingTimeTimer(time + interval);
            //记载下次触发的时间，interval这里是一分钟,并且通知定时器开始对下次触发进行倒计时。
            return TriggerResult.FIRE;//触发本次计算
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception
    {
        System.out.println("clear ....");
        ctx.getPartitionedState(stateDesc).clear();//清除分区状态所存储的文件(stateDesc)中的内容
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeStateDesc);
        long timestamp = fireTimestamp.get();
        ctx.deleteProcessingTimeTimer(timestamp);
        fireTimestamp.clear();
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(W window, OnMergeContext ctx) throws Exception
    {
        ctx.mergePartitionedState(stateDesc);
        ctx.mergePartitionedState(timeStateDesc);
    }

    @Override
    public String toString()
    {
        return "TimeCountTrigger(" + maxCount + ")";
    }

    /*
     * Creates a trigger that fires once the number of elements in a pane reaches the given count.
     *
     * @param maxCount The count of elements at which to fire.
     * @param <W>      The type of {@link Window Windows} on which this trigger can operate.
     */

//    输入maxCount,interval
//    返回类型是
    public static <W extends Window> TimeCountTrigger<W> of(long maxCount, Time interval)
    {
        return new TimeCountTrigger<>(maxCount, interval.toMilliseconds());
    }





//    实现聚合函数
    private static class Sum implements ReduceFunction<Long>
    {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception
        {
            return value1 + value2;
        }

    }

    private static class Min implements ReduceFunction<Long>
    {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception
        {
            return Math.min(value1, value2);
        }
    }
}
//代码来自:
//https://blog.csdn.net/baifanwudi/article/details/102937381


/*
总结:
上述这一大堆函数如何协同工作来实现两个触发条件呢？
主干其实是两个函数:
①onElement注册processing定时器触发条件以及个数触发条件
②当processing定时器倒计时为零时，调用onProcessingTime触发计算
③这类代码
如何快速抓住关键呢？
看代码中TriggerResult.FIRE所在位置，
有几个触发条件，就有几处TriggerResult.FIRE
*/